push down min/max/count to iceberg#6622
Conversation
| public void update(DataFile file) { | ||
| if (!isNull) { | ||
| R value = aggregate.eval(file); | ||
| update(value); |
There was a problem hiding this comment.
I removed the null check and did it in update. e.g. MaxAggregate.update
There was a problem hiding this comment.
Was this needed for correctness? I'm not sure I understand why you'd need to move it. The NullSafeAggregator was intended to avoid needing to handle null in the update methods. It also used to keep isNull set correctly so that any aggregate that is null would stop calling eval and updating.
There was a problem hiding this comment.
If one data file evaluates null, I think we still want to evaluate the rest of the data files. For example,
CREATE TABLE test (id LONG, data INT) USING iceberg PARTITIONED BY (id);
INSERT INTO TABLE test VALUES (1, null), (1, null), (2, 33), (2, 44), (3, 55), (3, 66);
SELECT max(data) FROM test;
For max(data), the first data file evaluates null, I think we still want to evaluate the rest of the data files to get the max value 66 for max(data).
There was a problem hiding this comment.
I am not sure I understand the purpose of isNull in this class then. Looks like we init it and never change?
There was a problem hiding this comment.
I see. In that case, I think we need to change isNull to hasValue and return a boolean from update(R).
The intent here was to signal when there is not enough information to produce a value. When there isn't, then the result value should be null, and we can skip pulling values out of rows or data files because we don't have enough information.
For example, if we are processing 3 Parquet files and 1 Avro file, the Avro file may not have a max value. Rather than giving a partial max from the 3 Parquet files, we need update(avroFile) to return hasValue = false so that we stop aggregating.
You're right that this needs to change from my original version, which assumed any null value signaled that there was no maximum. If we know that a file contains only null values, then we can skip it even if it doesn't have an upper bound. Similarly, if we get a null value from a row then we can skip it.
There was a problem hiding this comment.
I changed isNull to hasValue. I have also added a flag canPushDown in BoundAggregate to indicate if this aggregate can be pushed down. I think I need a way to differentiate the null: is the null due to stats not available (e.g. complex type) or due to the value is null, so I added this flag.
|
@rdblue Could you please take a look when you have time? I am not so sure if I added you as co-author correctly. It looks suspicious. |
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/expressions/CountNonNull.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/expressions/CountNonNull.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
Outdated
Show resolved
Hide resolved
| public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false; | ||
|
|
||
| // Controls whether to push down aggregate (MAX/MIN/COUNT) to Iceberg | ||
| public static final String AGGREGATE_PUSH_DOWN_ENABLED = "spark.sql.iceberg.aggregate_pushdown"; |
There was a problem hiding this comment.
We typically separate words with - rather than _. I think this should also match the other property. How about spark.sql.iceberg.aggregate-push-down-enabled?
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
Outdated
Show resolved
Hide resolved
| .map(agg -> SparkAggregates.convert(agg)) | ||
| .collect(Collectors.toList()); | ||
| aggregateEvaluator = AggregateEvaluator.create(schema, aggregates); | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
Can you make this exception more specific? What might be thrown here?
| .collect(Collectors.toList()); | ||
| aggregateEvaluator = AggregateEvaluator.create(schema, aggregates); | ||
| } catch (Exception e) { | ||
| LOG.info("Can't push down aggregates: " + e.getMessage()); |
There was a problem hiding this comment.
This shouldn't swallow the exception by only printing the message. Instead, this should pass the exception to the logger so that it gets printed with the full stack trace, suppressed exceptions, and causes.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
|
|
||
| List<ManifestFile> manifests = getSnapshot().allManifests(table.io()); | ||
|
|
||
| for (ManifestFile manifest : manifests) { |
There was a problem hiding this comment.
This is going to read all table metadata, which could be really large. Instead, I think this should use scan planning to get the data files. That will allow this to apply filters and skip a lot of data, and it would also parallelize manifest scanning using a ParallelIterator. You'd need to request stats, or else the tasks will be returned without them copied.
There was a problem hiding this comment.
Changed to use scan planning to get the data files. Please take a look to see if it's OK.
| } | ||
| } | ||
|
|
||
| Object[] res = aggregateEvaluator.result(); |
There was a problem hiding this comment.
I think it would be good to check whether the aggregates are non-null and only return if they are valid. Otherwise, this could return different results depending on whether stats are present in the file metadata. To avoid that, we can just detect whether we have a result and abort pushdown if we don't.
There was a problem hiding this comment.
I have added the null check, but after a second thought, I removed it.
If one of the column has all null values, then the max or min is also null. We probably still want to push down the aggregate.
I have checked the Metrics mode to disable push down if the mode doesn't have stats. I have also disabled push down for complex types. I am wondering if it's safe without the null check here. If not, I will put back.
There was a problem hiding this comment.
@huaxingao, I think the fix is to have a flag in the aggregator that can return whether or not the value is valid. That's what I wanted to use null for here, but you're right that there are cases where the aggregate is value and that value is null because there are no non-null values.
If we keep track of isValid in each aggregator, then the AggregateEvaluator can have a similar method to return whether all aggregates are valid. The we would just abort the aggregation if any value is not known. We can also have an override flag for when you want the closest answer, even if it isn't guaranteed to be correct.
FYI @aokolnychyi
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
| return true; | ||
| } | ||
|
|
||
| private Snapshot getSnapshot() { |
There was a problem hiding this comment.
Iceberg does not use get in method names. There's probably a better name here, like readSnapshot().
| // maybe changed, so disable push down aggregate. | ||
| if (Integer.parseInt(map.getOrDefault("total-position-deletes", "0")) > 0 | ||
| || Integer.parseInt(map.getOrDefault("total-equality-deletes", "0")) > 0) { | ||
| LOG.info("Cannot push down aggregates when row level deletes exist.)"); |
There was a problem hiding this comment.
Log messages should be more direct. In this case, the main information is that the aggregate pushdown is skipped. The reason why is secondary, but important. Rather than making this a statement that needs to be interpreted ("X is not possible" -> "Iceberg didn't do X"), this should be "Skipped aggregate pushdown: detected row level deletes".
In addition, I think that there are cases where you'd still want an answer from metadata. First, there may not be any matching delete files, so it could be safe. Second, it may be better to get an approximate answer. I think this should be handled using another setting that enables/disables aggregate pushdown when deletes are present. And for the first case, pushdown should only fail if there were delete files returned for at least one FileScanTask (as returned by planFiles called above).
There was a problem hiding this comment.
I also support the idea of checking if any matching tasks have deletes and using that instead of relying on generic snapshot metadata.
There was a problem hiding this comment.
I have changed the code to check the deletes in the tasks and abort the push down if deletes are present.
I also agree it may be better to introduce another setting to get an approximate number if there are deletes. Probably we can do this in a follow up PR.
| // be used to calculate min/max/count, will enable aggregate push down in next phase. | ||
| // TODO: enable aggregate push down for partition col group by expression | ||
| if (aggregation.groupByExpressions().length > 0) { | ||
| LOG.info("Group by aggregation push down is not supported yet."); |
There was a problem hiding this comment.
Error message: don't use "yet". It is simply not supported.
It should be possible to do this, but I understand skipping it in the first PR.
|
I'd love to take a look at this PR on Monday too. |
api/src/main/java/org/apache/iceberg/expressions/AggregateEvaluator.java
Show resolved
Hide resolved
| private final List<BoundAggregate<?, ?>> aggregates; | ||
|
|
||
| private AggregateEvaluator(List<BoundAggregate<?, ?>> aggregates) { | ||
| ImmutableList.Builder<BoundAggregate.Aggregator<?>> aggregatorsBuilder = |
There was a problem hiding this comment.
nit: What about a direct import for Aggregator to shorten the lines?
ImmutableList.Builder<Aggregator<?>> aggregatorsBuilder = ImmutableList.builder();
api/src/main/java/org/apache/iceberg/expressions/AggregateEvaluator.java
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/expressions/AggregateEvaluator.java
Outdated
Show resolved
Hide resolved
| public void update(DataFile file) { | ||
| if (!isNull) { | ||
| R value = aggregate.eval(file); | ||
| update(value); |
There was a problem hiding this comment.
I am not sure I understand the purpose of isNull in this class then. Looks like we init it and never change?
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
Outdated
Show resolved
Hide resolved
| public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false; | ||
|
|
||
| // Controls whether to push down aggregate (MAX/MIN/COUNT) to Iceberg | ||
| public static final String AGGREGATE_PUSH_DOWN_ENABLED = "spark.sql.iceberg.aggregate_pushdown"; |
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAggregates.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Show resolved
Hide resolved
|
|
||
| AggregateEvaluator aggregateEvaluator; | ||
| try { | ||
| List<BoundAggregate<?, ?>> aggregates = |
There was a problem hiding this comment.
I'd consider adding another method to SparkAggregates to convert an entire Aggregation. That way, we will be able to simplify this block.
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
Show resolved
Hide resolved
|
Great work, @huaxingao! I am looking forward on this being merged. |
|
@aokolnychyi @rdblue Thank you very much for your review! I have addressed most of the comments. Will finish the rest at a later time. |
ed071e8 to
d659a51
Compare
| } | ||
|
|
||
| @Test | ||
| public void testAggregateNotPushDownForStringType() { |
There was a problem hiding this comment.
I think that this test depends on the default metrics mode for string, not on the type being a string itself. If the metrics mode were 'full' so that values aren't truncated, then it would work. You may want to set the metrics mode explicitly for this test and test the case where the metrics are not truncated.
There was a problem hiding this comment.
I actually set the metrics to full in the end of the test and tested pushed down
There was a problem hiding this comment.
I'm glad to hear that there's a test for full, but the correctness of this test shouldn't rely on the default metrics mode. I think it should explicitly set the string column's mode to truncate.
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
api/src/test/java/org/apache/iceberg/expressions/TestAggregateEvaluator.java
Show resolved
Hide resolved
api/src/test/java/org/apache/iceberg/expressions/TestAggregateEvaluator.java
Show resolved
Hide resolved
...park-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
Show resolved
Hide resolved
| List<Object[]> explain = sql("EXPLAIN " + select, tableName); | ||
| String explainString = explain.get(0)[0].toString(); | ||
| boolean explainContainsPushDownAggregates = false; | ||
| if (explainString.contains("count(*)".toLowerCase(Locale.ROOT)) |
There was a problem hiding this comment.
It's the explain string that needs to be lower cased in these tests as well.
rdblue
left a comment
There was a problem hiding this comment.
There are a couple of minor issues with the use of string contains in the tests, but overall I think this is correct and almost ready to go in. Thanks, @huaxingao for getting this done!
|
@huaxingao, I merged this PR, since we can fix some of the minor issues in follow ups and I want to make sure this is in the next release. Thanks! |
|
@rdblue Thank you very much for helping me on this PR! Really appreciate all your help! |
aokolnychyi
left a comment
There was a problem hiding this comment.
Great work, @huaxingao! I had a few follow-up comments. Sorry it took me so long to get back to the PR.
| * | ||
| * @return a new scan based on this with column stats | ||
| */ | ||
| default TableScan withColStats() { |
There was a problem hiding this comment.
Why do we have to add this if we already have includeColumnStats defined in Scan?
I think we should be able to use that.
There was a problem hiding this comment.
Right. Sorry I didn't notice there is already an existing method.
There was a problem hiding this comment.
No problem at all. I forgot about that one initially too.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAggregates.java
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkAggregates.java
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkLocalScan.java
Show resolved
Hide resolved
(cherry picked from commit 0797b89)
This PR pushes down min/max/count to iceberg.
Combining #6252 and #6405